/*
fpcd.c
------

Program fpcd is a server program designed to interact with a running
FPC client.  Start the program "fpcd" on all server nodes.  UDP
broadcasts from the client will be picked up by all fpcd processes on
the local subnet, these computers will then function as nodes in the
distributed FPC build.

This software has been created by The Genome Sequence Centre, BC
Cancer Research Centre, Vancouver ('the creators').  The creators
hereby grant permission to use this software and its documentation for
academic and non-commercial purposes without fee at the user's own
risk on the basis set out below.  The users shall not re-distribute
this software nor re-distribute a modified version.  The creators
neither undertake nor accept any duty whether contractual or otherwise
in connection with the software, its use or the use of any derivative,
and makes no representations or warranties, express or implied,
concerning the software, its suitability, fitness for a particular
purpose or non-infringement.  In no event shall the creators or
authors of the software be responsible or liable for any loss or
damage whatsoever arising in any way directly or indirectly out of the
use of this software or its derivatives, even if advised of the
possibility of such damage.


This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of the
License, or (at your option) any later version.  This program is
distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public
License for more details.  You should have received a copy of the
GNU General Public License along with this program; if not, write
to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
Boston, MA 02111-1307 USA


In all papers and correspondance, please use the following reference
for distributed FPC:

"Distributed and Multithreaded FPC code implemented at the BC Genome
Sequence Centre.

A parallelized version of FPC for the assembly of fingerprint contigs.  
Bioinformatics, (to be published). Ness, S.R., Terpstra, W., Krzywinski,
M., Marra M.A., Jones, S.J.M*. Genome Sequence Centre, British Columbia
Cancer Agency, 600 West 10th Avenue, Vancouver, BC V5Z 4E6, Canada."

*/


#include <stdlib.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/file.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <syslog.h>
#include <pwd.h>

#include "clam.h"
#include "parallel/parallel.h"
#include "parallel/serial.h"

pthread_mutex_t clientSocketMutex;
int             clientSocket;

void* remoteWorkerProcedure(void* arg)
{
	int orders;
	int i;
	int packetSize;
	int wrote;
	
	u_int32_t* packet;
	u_int32_t  counter;
	u_int32_t* w;
	
	/* We are supposed to ignore signals SIGPIPE & SIGALRM & SIGTERM & SIGINT
	 *	- see parallel.c 
	 */
	sigset_t set;
	sigemptyset(&set);
	sigaddset(&set, SIGALRM);
	sigaddset(&set, SIGPIPE);
	sigaddset(&set, SIGTERM);
	sigaddset(&set, SIGINT);
	pthread_sigmask(SIG_BLOCK, &set, 0);

	while (1)
	{
		orders = acquireWorkerOrders(STATUS_OK);
		if (orders == -1) return (void*)STATUS_OK;

		/* printf("in fpcd orders : %d\n",orders); fflush(stdout); */
		
		if (!Zcreate_node_parallel(orders))
		{
			/* An error occured, consume all tasks and return
			 * the error so that we exit cleanly
			 */
			while (orders != -1)
			{
			  orders = acquireWorkerOrders(STATUS_FAIL_ALGORITHM);
			}
			
			return (void*)STATUS_FAIL_ALGORITHM;
		}
		
		pthread_mutex_lock(&ZZ.matrix[orders].lock);
		
		counter = 0;
		for (i = ZZ.matrix[orders].first; i != -1; i = ZZ.pool[i].next)
			if (ZZ.pool[i].i2 > orders)
				counter++;
		
		packetSize = (counter*3) + 3;
		packet = (u_int32_t*)malloc(packetSize * sizeof(u_int32_t));
		
		/* printf("\ncreating packet for sending order #%d\n",orders); fflush(stdout); */
		
		if (!packet)
		{
		  printf("out of memory sending packet - quitting\n"  ); fflush(stdout);
			syslog(LOG_ERR, "out of memory sending packet - quitting");
			exit(1);
		}
		
		w = packet;
		
		*w++ = htonl(DAEMON_DONE_CLONE);
		*w++ = htonl(orders);
		*w++ = htonl(counter);
		for (i = ZZ.matrix[orders].first; i != -1; i = ZZ.pool[i].next)
		{
			if (ZZ.pool[i].i2 > orders)
			{
				*w++ = htonl(ZZ.pool[i].i2);
				*w++ = htonl(ZZ.pool[i].exponent);
				*w++ = htonl(ZZ.pool[i].match);
			}
		}
		pthread_mutex_unlock(&ZZ.matrix[orders].lock);
		
		pthread_mutex_lock(&clientSocketMutex);
		fcntl(clientSocket, F_SETFL, fcntl(clientSocket, F_GETFL, 0) & ~O_NONBLOCK);
		wrote = write(clientSocket, packet, sizeof(u_int32_t)*packetSize);

		fcntl(clientSocket, F_SETFL, fcntl(clientSocket, F_GETFL, 0) | O_NONBLOCK);
		pthread_mutex_unlock(&clientSocketMutex);	
		free(packet);
		
		if (wrote != sizeof(u_int32_t)*packetSize)
		{
			while (orders != -1)
			{
				orders = acquireWorkerOrders(STATUS_FAIL_NETWORK);
			}
			return (void*)STATUS_FAIL_NETWORK;
		}
	}
}

/*
void respawn(int argc, char** argv)
{
	syslog(LOG_NOTICE, "respawning\n");
	closelog();
	
	execvp(argv[0], argv);
	syslog(LOG_ERR, "cannot respawn\n");
}
*/

int addressLocal(struct in_addr addr)
{
	struct sockaddr_in sa;
	int                sock;

	sa.sin_family = AF_INET;
	sa.sin_addr   = addr;
	sa.sin_port   = htons(0);

	if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
	{
		perror("obtaining sock - quitting");
		exit(-1);
	}

	if (bind(sock, &sa, sizeof(sa)) < 0)
    {
		close(sock);
		return 0; /* not local */
	}
	else
    {
		close(sock);
		return 1; /* is local */
	}
}

void writeStartDaemon(int threadCount)
{
#ifndef NO_DAEMON_LOG
 	FILE*  logFile;
	char   timeBuf[100];
	time_t now;

	/* Write job startup to disk */
	// if ((logFile = fopen("/var/log/fpcd/jobs.log", "a")) != (FILE*)0)
	if ((logFile = fopen("/home/fpc/gaurav/jobs.log", "a")) != (FILE*)0)
	{
		/* flock(fileno(logFile), LOCK_EX); */
		lockf(fileno(logFile), F_LOCK, 0);
		now = time(0);
		strftime(timeBuf, sizeof(timeBuf), "%Y/%m/%d-%H:%M:%S", localtime(&now));
		fprintf(logFile, "%li %s START_DAEMON %s %i %i\n", 
				now, timeBuf, getpwuid(getuid())->pw_name, getpid(), threadCount);
		/* flock(fileno(logFile), LOCK_UN); */
		lockf(fileno(logFile), F_ULOCK, 0);
		fclose(logFile);
	}
#endif
}

void writeDoneDaemon()
{
#ifndef NO_DAEMON_LOG
    FILE*  logFile;
	time_t now;
	char   timeBuf[100];

	/* Write job completion to disk */
	//if ((logFile = fopen("/var/log/fpcd/jobs.log", "a")) != (FILE*)0)
	if ((logFile = fopen("/home/fpc/gaurav/jobs.log", "a")) != (FILE*)0)
	  {
		/* flock(fileno(logFile), LOCK_EX); */
		lockf(fileno(logFile), F_LOCK, 0);
		now = time(0);
		strftime(timeBuf, sizeof(timeBuf), "%Y/%m/%d-%H:%M:%S", localtime(&now));
		fprintf(logFile, "%li %s STOP_DAEMON %s %i\n", 
				now, timeBuf, getpwuid(getuid())->pw_name, getpid());
		/* flock(fileno(logFile), LOCK_UN); */
		lockf(fileno(logFile), F_ULOCK, 0);
		fclose(logFile);
	}
#endif
}

void cleanExit(int sig)
{
    writeDoneDaemon();
	exit(0);
}



/* This starts up the main loop:
 * 	listen for requests
 *	retrive clone information
 *	process clone information
 *	send clone information
 *  repeat
 */
int main(int argc, char** argv)
{
    int        threadCount;
	pthread_t* threads;
	void*      value;
	int        i;	

	int                client_len;
	struct sockaddr_in client;
	u_int32_t          magic;
	u_int32_t          packet[2];
	int                daemon;
	struct sockaddr_in addr;
	u_int32_t          done;
	int                out;
	int                invalid;

	FILE *fpc = fopen("fpcd.txt","w");

	/* Lower our priority by making ourselves nicer */
	nice(5);

	/*	if (fork() != 0)
	{
		printf("Background process started.\n");
		return 0;
		}*/
	
	openlog("fpcd", 0, LOG_DAEMON);

	// sness
	// Lowered this to be gentler on computers and their users.  Raise
	// it for slightly more efficient processing.
   	
	//threadCount = sysconf(_SC_NPROCESSORS_CONF);
	threadCount = 1;
	
	//printf("thread count= %d\n", threadCount); fflush(stdout);
	
	//	threadCount = sysconf(_SC_NPROCESSORS_CONF) * 2;
	
	if (!(threads = (pthread_t*)malloc(sizeof(pthread_t) * threadCount)))
	{
		syslog(LOG_ERR, "not enough memory for thread array - quitting\n");
		fprintf(fpc,"not enough memory for thread array - quitting\n");
		exit(1);
	}	

	initMutexes(); /* does all the common dispatcher mutexes */    
    pthread_mutex_init(&clientSocketMutex, 0);
    
    syslog(LOG_NOTICE, "booted - %i threads\n", threadCount);
	printf("Daemon ready\n"); fflush(stdout);

	/* Prepare the exit cleanup routine */
	signal(SIGTERM, &cleanExit);
	signal(SIGINT,  &cleanExit);
	signal(SIGPIPE, SIG_IGN);

	// Write startup data to logfile ("/var/log/fpcd/jobs.log")
	writeStartDaemon(threadCount);

	while (1)
    {
		/* Open our listen UDP port */
		if ((daemon = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
	    {
			syslog(LOG_ERR, "%s: getting a UDP socket - quitting\n", 
				   strerror(errno));
			fprintf(fpc,"%s: getting a UDP socket - quitting\n", strerror(errno));
			// Write shutdown data to logfile
			writeDoneDaemon();
			exit(1);
		}
	
		// Setup 'addr' to broadcast on DAEMON_PORT
		addr.sin_family      = AF_INET;
		addr.sin_addr.s_addr = INADDR_ANY;
		addr.sin_port        = htons(DAEMON_PORT);
	
		// Bind our daemon to sockaddr 'addr'
		if (bind(daemon, (struct sockaddr*)&addr, sizeof(addr)) < 0)
	    {
			syslog(LOG_ERR, 
				   "%s: binding the UDP socket to port %i - quitting\n", 
				   strerror(errno), DAEMON_PORT);
			writeDoneDaemon();
			exit(1);
		}
	
		seteuid(2); /* if root: we have the port we want, become daemon */

		/* Wait for a connection */
		while (1)
		{
			client_len = sizeof(client);
			if (recvfrom(daemon, &packet, sizeof(packet[0])*2, 0,
						 (struct sockaddr*)&client, &client_len) != sizeof(packet[0])*2)
			{
				syslog(LOG_WARNING, "fractional noise on daemon port\n");
				continue;
			}
	
			if (ntohl(packet[0]) != MAGIC)
		    {
				syslog(LOG_WARNING, "non-magical noise on our daemon port\n");
				continue;
			}
		
			if (addressLocal(client.sin_addr))
		    {
			    syslog(LOG_NOTICE, "local machine passed over as daemon\n");
				continue;
			}
	
			syslog(LOG_NOTICE, "sparse matrix computation request from: %s:%u\n",
				   inet_ntoa(client.sin_addr),
				   ntohs(client.sin_port));
		
			break;
		}
	
		/* No more desire to here broadcast requests - we have a client */
		if (close(daemon) == -1)
	    {
			syslog(LOG_ERR, "%s: closing daemon port - quitting\n",
				   strerror(errno));
			writeDoneDaemon();
			exit(1);
		}
	
		/* Attempt to connect to the client's TCP port */
		if ((clientSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
	    {
			syslog(LOG_ERR, "%s: getting a TCP socket - quitting\n",
				   strerror(errno));
			writeDoneDaemon();
			exit(1);
		}
	
		/* client sin_addr & sin_family already set by recvfrom */
		client.sin_port = htons(ntohl(packet[1]));
		if (connect(clientSocket, (struct sockaddr*)&client, sizeof(client)) < 0)
	    {
			syslog(LOG_WARNING, "%s: connecting to client\n",
				   strerror(errno));
			close(clientSocket);
			continue;
		}
	
		/* Make sure the client knows to send us the magic number */
		if (readLoop(clientSocket, &magic, sizeof(magic)) != sizeof(magic))
		{
			syslog(LOG_WARNING, "client doesn't believe in magic - restarting\n");
			close(clientSocket);
			continue;
		}
	
		if (ntohl(magic) != MAGIC)
	    {
			syslog(LOG_WARNING, "client didn't send right magic number (wrong version?)\n");
			close(clientSocket);
			continue;
		}

		/**** Starting to do the work ****/
		if (readPz(&Pz, clientSocket)         == -1)
	    {
			syslog(LOG_ERR, "%s: during Pz transfer", strerror(errno));
			fprintf(fpc,"%s: during Pz transfer", strerror(errno) );
			close(clientSocket);
			continue;
		}
		/* printf("\ndone pz.\n");	fflush(stdout); */
	

		if (readProj(&Proj, clientSocket)     == -1)
	    {
			syslog(LOG_ERR, "%s: during Proj transfer", strerror(errno));
			fprintf(fpc, "%s: during Proj transfer", strerror(errno)  );
			close(clientSocket);
			continue;
		}
		/* printf("\ndone prj.\n");fflush(stdout); */
	
   
		if (readCpM(&Cp, clientSocket)        == -1)
	    {
			syslog(LOG_ERR, "%s: during Cp transfer", strerror(errno));
			fprintf(fpc,  "%s: during Cp transfer", strerror(errno) );
			close(clientSocket);
			continue;
	    }
		/* printf("\ndone cpm\n"); fflush(stdout); */

		
		if (readAcedata(clientSocket) == -1)
	    {
			syslog(LOG_ERR, "%s: during acedata transfer", strerror(errno));
			fprintf(fpc,  "%s: during acedata transfer", strerror(errno)  );
			close(clientSocket);
			releaseCzFastCache();
			continue;
		}
		/* printf("\ndone ace\n");fflush(stdout); */
		//sleep(5);


                if (readMatrix(clientSocket) == -1)
		  {
		    syslog(LOG_ERR, "%s: during matrix transfer", strerror(errno));
		    fprintf(fpc, "%s: during matrix transfer", strerror(errno)  );
		    close(clientSocket);
		    releaseCzFastCache();
		    acedata = arrayReCreate(acedata, 0, CLONE);
		    continue;
		  }
		/* printf("\ndone matrix\n");fflush(stdout); */
		//sleep(5);

		// sness
 		if (readMarkerdata(clientSocket) == -1)
 	    {
 			syslog(LOG_ERR, "%s: during markerdata transfer", strerror(errno));
			fprintf(fpc,"%s: during markerdata transfer", strerror(errno)  );
 			close(clientSocket);
 			releaseCzFastCache();
 			continue;
 		}
		/* printf("\ndone marker\n");fflush(stdout); */
		//sleep(5);
	
		if (readDistrib(clientSocket) == -1)
	    {
			syslog(LOG_ERR, "%s: during distrib transfer", strerror(errno));
			fprintf(fpc, "%s: during distrib transfer", strerror(errno) );
			close(clientSocket);
			releaseCzFastCache();
			continue;
		}
	
		/* printf("\ndone distrib\n");fflush(stdout); */
		//sleep(5);

		if (readLoadCzFastCache(clientSocket) == -1)
		  {
		    syslog(LOG_ERR, "%s: during cache transfer", strerror(errno));
		    fprintf(fpc, "%s: during Cp transfer", strerror(errno)  );
		    close(clientSocket);
		    continue;
		  }

		/* printf("\ndone loadCz\n");fflush(stdout); */
		//sleep(5);
		
					
		//
		// sness - Temporary testing
		//
		//test_write("/home/sness/write-daemon");
	
		syslog(LOG_NOTICE, "job involves %i clones\n", ZZ.size);
		


		for (i = 0; i < threadCount; i++)
	    {
			pthread_create(&threads[i], 0, remoteWorkerProcedure, "");
		}
		
		
		/* Go non-blocking for children */
		fcntl(clientSocket, F_SETFL, fcntl(clientSocket, F_GETFL, 0) | O_NONBLOCK);
		out = slaveDispatcherProcedure(threadCount, clientSocket, &clientSocketMutex);
	
		if (out < 0) invalid = out;
		else         invalid = STATUS_OK;

		for (i = 0; i < threadCount; i++)
	    {
			pthread_join(threads[i], &value);
			if ((int)value < 0) invalid = (int)value;
		}
	
		/* Completed - return to blocking mode */
		fcntl(clientSocket, F_SETFL, fcntl(clientSocket, F_GETFL, 0) & ~O_NONBLOCK);

		if (invalid == STATUS_OK)
		{
			/* Write the we're done! message */
			if (invalid) done = htonl(DAEMON_REPORTING_ERROR);
			else         done = htonl(DAEMON_WANTS_TO_QUIT);
		
			/* I don't really care at this point what happens to the client */
			write(clientSocket, &done, sizeof(done));
		
			syslog(LOG_NOTICE, "transaction complete: %lli clones processed, %i cache misses\n", 
				   jobCount, out);
		}
		else
	    {
			syslog(LOG_NOTICE, "transaction failed: %lli clones processed\n", jobCount);
		}
		
		if (close(clientSocket) == -1)
		  syslog(LOG_ERR, "%s: closing client link\n",
				 strerror(errno));

		close(clientSocket);
		releaseCzFastCache();
		acedata = arrayReCreate(acedata, 0, CLONE);
		Zdestroy_instance(&ZZ);
		continue;
	}

	free(threads);
	exit(0);
}
